package jehc.cloud.sentinel.log.task;

import com.alibaba.csp.sentinel.node.metric.MetricNode;
import com.alibaba.fastjson.JSON;
import jehc.cloud.sentinel.util.CollectionUtil;
import jehc.cloud.sentinel.util.FlowRuleUtil;
import jehc.cloud.sentinel.util.KafkaUtils;
import jehc.cloud.sentinel.util.SpringApplicationUtils;
import jehc.cloud.sentinel.vo.MetricVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RecursiveTask;

/**
 *
 */
@Slf4j
public class MetricLogTask extends RecursiveTask<Integer> {
    String appName;
    List<MetricNode> metricNodes;
    int groupNumber;

    public MetricLogTask(String appName, List<MetricNode> metricNodes, int groupNumber){
        this.appName = appName;
        this.metricNodes = metricNodes;
        this.groupNumber = groupNumber;
    }

    @Override
    protected Integer compute() {
        int result=0;
        if(CollectionUtils.isEmpty(metricNodes)){
            return result;
        }
        if(metricNodes.size()<=groupNumber){
            //一个任务即可
            result+=doCallable(appName,metricNodes);
        }else{
            //拆分任务
            List<MetricLogTask> subTaskList =createSubTasks(appName,metricNodes);

            //方法二 采用invokeAll（注意：该效率高一些）
            invokeAll(subTaskList);

            //合并执行结果
            for(MetricLogTask subTask :subTaskList){
                result+=subTask.join();
            }
        }
        return result;
    }

    /**
     *
     * @param metricNodes
     * @return
     */
    private Integer doCallable(String appName,List<MetricNode> metricNodes){
        Integer result = push(appName,metricNodes);
        return result;
    }

    /**
     *
     * @param appName
     * @param metricNodes
     * @return
     */
    private List<MetricLogTask> createSubTasks(String appName,List<MetricNode> metricNodes) {
        List<MetricLogTask> subTasks = new ArrayList<MetricLogTask>();
        Map<String,List<MetricNode>> dataMap = CollectionUtil.groupList(metricNodes,groupNumber);
        Iterator<Map.Entry<String, List<MetricNode>>> entryIterator = dataMap.entrySet().iterator();
        while (entryIterator.hasNext()) {
            Map.Entry<String, List<MetricNode>> e = entryIterator.next();
            List<MetricNode> metricLogTasks = e.getValue();
            MetricLogTask metricLogTask = new MetricLogTask(appName,metricLogTasks,groupNumber);
            subTasks.add(metricLogTask);
        }
        return subTasks;
    }

    /**
     * 推送
     * @param appName
     * @param metricNodes
     * @return
     */
    public Integer push(String appName,List<MetricNode> metricNodes){
        FlowRuleUtil flowRuleUtil = SpringApplicationUtils.getBean(FlowRuleUtil.class);
        KafkaUtils kafkaUtils = SpringApplicationUtils.getBean(KafkaUtils.class);
        Integer result=0;
        if(CollectionUtils.isEmpty(metricNodes)){
            log.info("没有可推送的日志...");
            return 0;
        }
        for(MetricNode metricNode:metricNodes){
            try{
                MetricVo metricVo = new MetricVo();
                metricVo.setAppName(appName);
                convert(metricNode,metricVo);
                String json = JSON.toJSONString(metricVo);
                kafkaUtils.send(flowRuleUtil.getSentinelTopic(), json);
                result++;
            }catch (Exception e){
                log.error("推送失败");
            }
        }
        log.info("推送成功");
        return result;
    }

    /**
     * 转换日志对象
     * @param metricNode
     * @param metricVo
     */
    public void convert(MetricNode metricNode, MetricVo metricVo){
        try {
            metricVo.setBlockQps(metricNode.getBlockQps());
            metricVo.setExceptionQps(metricNode.getExceptionQps());
            metricVo.setOccupiedPassQps(metricNode.getOccupiedPassQps());
            metricVo.setPassQps(metricNode.getPassQps());
            metricVo.setResource(metricNode.getResource());
            metricVo.setRt(metricNode.getRt());
            metricVo.setSuccessQps(metricNode.getSuccessQps());
            metricVo.setTimestamp(metricNode.getTimestamp());
        }catch (Exception e){
            log.error("转换日志异常：{}",e);
        }
    }
}
